package com.lee.mq.listener;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.lee.service.PointService;
import com.lee.vo.OrderDTO;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.List;

/***
 * @description: 消费者监听器
 * @author : 青石路
 * @date: 2021/11/13 12:02
 */
@Component
@Slf4j
public class OrderSuccessListener implements MessageListenerConcurrently {

    @Resource
    private PointService pointService;

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
        log.info("下单成功，会员积分处理，消息消费开始");
        try{
            for (MessageExt message : msgList) {
                OrderDTO order  = JSONObject.parseObject(message.getBody(), OrderDTO.class);
                log.info("下单成功，会员积分处理，订单信息:{}", JSON.toJSONString(order));
                pointService.add(order);
            }
            log.info("下单成功，会员积分处理，消息消费完成");
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }catch (Exception e){
            log.error("下单成功，会员积分处理，消息消费异常。{}",e);
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
}
